package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.sun.org.apache.xpath.internal.SourceTree;

public class Consumer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
		consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
		/** 批量消费,每次拉取10条,默认1条 */
		consumer.setConsumeMessageBatchMaxSize(10);// ①
		// consumer.setInstanceName("quick_start_consumer");
		// 3.2.6这个版本没有这个方法，3.5.3版本要设置这个方法为false,否则取不到topic
		// consumer.setVipChannelEnabled(false);

		// 程序第一次启动从消息队列头取数据
		// 如果非第一次启动，那么按照上次消费的位置继续消费
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		/** 消费线程池最小数量，默认10 */
		consumer.setConsumeThreadMin(10);
		/** 消费线程池最大数量，默认20 */
		consumer.setConsumeThreadMax(20);
		/** 拉消息本地队列缓存消息最大数：1000 */
		// consumer.setPullThresholdForQueue(1000);
		/** 批量拉消息，一次拉取多少条，默认32条 */
		// consumer.setPullBatchSize(32);
		/**消息拉取线程，每隔多久拉一次，默认为0*/
		//consumer.setPullInterval(0);
		// 订阅PushTopic下Tag为push的消息
		consumer.subscribe("PushTopic_tt1", "*");
		// consumer.subscribe("PushTopic_tt1", "TagA || Tag B || Tage C");
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				// 如果是先启动Consumer端后启动Producer端虽然在①出设置了批量消费，其实这里还是一条一条的在拉取，所以这里的for循环是个误导
				// 直接使用 MessageExt msg = msgs.get(0)
				// 就可以。But如果是先启动Producer端后启动Consumer端，那这个单数就是有用的了
				// System.out.println("消息条数:" + msgs.size());
				// MessageExt msg = msgs.get(0);
				try {
					for (MessageExt msg : msgs) {
						System.out.println("-------->" + msg.getKeys());
						System.out.println("-------->" + msg.getMsgId());
						System.out.println("-------->" + msg.getQueueId());
						System.out.println("-------->" + msg.getQueueOffset());
						System.out.println("-------->" + msg.getBody().toString());
						System.out.println("-------->" + msg.toString());
					}
				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
		// consumer.suspend();

	}
}
